AkkaのActorでBroadcast


概要

AkkaのActorは、ScalaのActorsと異なり、ActorSystemという根幹システムを持っている。


これによって、Actor間を跨いだコントロールや、システム同士のコントロールが可能になっている。


Actors(2.9.x以前のScalaのActor)だと、この辺が無かったので、

Broadcast = メッセージが全Actorに届く とかしたい場合、自前でobject置くとかして対処してた。


要は、機構として、Centralみたいなものが無くて、自前で実装しなきゃいけなかった、と。


その辺、1WくらいでざっとJavaから使えるようにしたのがこの辺

https://github.com/sassembla/ScalaMessengerPrototype


で、Akkaで書き直してJava?なにそれ?な感じになったものを使ってはや三ヶ月。

月日が経つのは速い。

もうActorsに関しては何もかも無かった事にしていいと思う。

で、



AkkaのActorでBroadcastな仕組みを作ってみる

例として今回のお題、複数のActor間でメッセージを解釈したい場合、


AkkaのActorでは、EventBusを使ってpub-subが実装できる。

参考:http://doc.akka.io/docs/akka/2.1-M2/scala/event-bus.html



(Akka Router のBroadcastはこれとは異なった用途に使用する)

参考:http://doc.akka.io/docs/akka/snapshot/scala/routing.html


こんな感じ。

SampleBroadcasterMain.scala

import akka.actor._


case class Message(body:String)


class SampleActor extends Actor {

  import java.util.UUID


  val myId = UUID.randomUUID.toString


  def receive = {

    case message:Message => println("I am "+myId+" /message:"+message.body)    

  }

}



object SampleBroadcasterMain {

  def main(args: Array[String]) = {


   //Genereate system

    val system = ActorSystem("namespace")


    //Create actor

    val sub1 = system.actorOf(Props[SampleActor])


    //add actor to system-subscriber's network

    system.eventStream.subscribe(sub1, classOf[Message])


    //add another one -the 2nd

    system.eventStream.subscribe(system.actorOf(Props[SampleActor]), classOf[Message])


    //and add another one -the 3rd

    system.eventStream.subscribe(system.actorOf(Props[SampleActor]), classOf[Message])


    //publish messeage from here to the all subscribers.

    system.eventStream.publish(Message("hereComes! subscrivers!!"))

  }

}



実行すると

17:06:50.341 [INFO] [org.gradle.api.internal.project.ant.AntLoggingAdapter] [ant:java] I am 77175ffd-9823-4444-96ac-7dc6e7d250e4 /message:hereComes! subscrivers!!

17:06:50.342 [INFO] [org.gradle.api.internal.project.ant.AntLoggingAdapter] [ant:java] I am 6166563a-c421-4ae4-a820-8fbdec895e2e /message:hereComes! subscrivers!!

17:06:50.343 [INFO] [org.gradle.api.internal.project.ant.AntLoggingAdapter] [ant:java] I am 88b7d855-b0fa-4706-87fd-558760876024 /message:hereComes! subscrivers!!


見事に、登録したactorすべてにMessageオブジェクトが投げられている。

らっくーーー。


eventBusは、既存の用途としてDeadLetterとかに使っている。

それを外側からも使えるように、参加可能なようにしてある、という。

なるほど的な感じ。



他にも何が楽か

Logとかが楽になっています。

たとえばLoggerみたいなものを個別に積んだり、


LoggerActorみたいなのを作ってCentralPointみたいなものを自前で作ったりしてたのですが、


Akkaになると、ActorSystemに対してのLoggingの受け皿が取り付け可能なので、あとからLogをつけたり外したりがとても容易になりました。

全体像としては、


・LogActorをActorへとmixin + log出力するコードをActorの中に書く

・systemへと



systemへとlogListenerを追加

    val logListener = system.actorOf(Props[SampleLogListener])


と、

logを入力したいActorへとtraitをmixim

class SampleActor extends Actor with ActorLogging {

  import java.util.UUID


  val myId = UUID.randomUUID.toString


  def receive = {

    case message:Message => {

      println("I am "+myId+" /message:"+message.body)

      log.info("I am "+myId+" /message:"+message.body)

    }

  }

}



で、LogListenerの本体はこんな感じ

class SampleLogListener extends Actor {

  //log

  import akka.event.Logging.InitializeLogger

  import akka.event.Logging.LoggerInitialized

  import akka.event.Logging.Error

  import akka.event.Logging.Warning

  import akka.event.Logging.Info

  import akka.event.Logging.Debug

  

  def receive = {

    case InitializeLogger(_)                        print("initilaized")

    case Error(cause, logSource, logClass, message) print("Err  " + message)

    case Warning(logSource, logClass, message)      print("War  " + message)

    case Info(logSource, logClass, message)         print("Inf  " + message)

    case Debug(logSource, logClass, message)        print("Deb  " + message)

  }

}


ってな感じなので、後付けもらくちん。


ログはこんな感じにでます。

18:25:55.925 [INFO] [org.gradle.api.internal.project.ant.AntLoggingAdapter] [ant:java] [INFO] [01/20/2013 18:25:55.845] [namespace-akka.actor.default-dispatcher-4] [akka://namespace/user/$b] I am 77175ffd-9823-4444-96ac-7dc6e7d250e4 /message:hereComes! subscrivers!!

18:25:55.926 [INFO] [org.gradle.api.internal.project.ant.AntLoggingAdapter] [ant:java] [INFO] [01/20/2013 18:25:55.849] [namespace-akka.actor.default-dispatcher-2] [akka://namespace/user/$d] I am 6166563a-c421-4ae4-a820-8fbdec895e2e /message:hereComes! subscrivers!!

18:25:55.926 [INFO] [org.gradle.api.internal.project.ant.AntLoggingAdapter] [ant:java] [INFO] [01/20/2013 18:25:55.850] [namespace-akka.actor.default-dispatcher-3] [akka://namespace/user/$c] I am 88b7d855-b0fa-4706-87fd-558760876024 /message:hereComes! subscrivers!!

ここにfluentdとかへのつなぎ込みのコードを書けば、、、

あとは、、、わかるな、、?


おまけ ActorSystemへようこそ

logの中に、akka://namespace/user/ とかが出ました。


そう、このへんは、ActorSystemの名称なんです。さっき決めたやつ。

userから先に、actorが設置されている。


Supervisorとかの話につながる、ActorSystemの全体像が、この辺に関わる感じ。

詳しくは

http://doc.akka.io/docs/akka/2.0/general/actor-systems.html



サンプルとか

サンプルは下記にupしますた。

https://github.com/sassembla/AkkaSampleBroadcaster


フォルダで、

gradlew runJar -d とかやると、稼働する筈。